2.09. RabbitMQ
RabbitMQ
RabbitMQ — это популярный брокер сообщений (Message Broker), который реализует протокол AMQP. Он используется для асинхронного обмена данными между приложениями через очереди. RabbitMQ обеспечивает надёжную доставку сообщений, масштабируемость и гибкость. Как почтовое отделение, точно доставляющее по адресам.
Официальный сайт - https://www.rabbitmq.com/
Сообщения хранятся в очередях до тех пор, пока не будут обработаны. Производитель отправляет сообщение в очередь, а потребитель забирает его позже.

Порой сервисы нужно отделить друг от друга, обеспечить доставку сообщений между ними даже при падении системы, и при этом управлять нагрузкой и очередями задач. Для этого «кролик» использует простой принцип:
- Производитель отправляет сообщение.
- Обменник получает сообщение и решает, куда его направить.
- Сообщение помещается в очередь.
- Потребитель забирает сообщение из очереди.
Сообщение гарантированно не потеряется: оно остаётся в очереди, пока не будет подтверждено потребителем.
RabbitMQ использует модель «производитель-потребитель» с промежуточным хранилищем — очередью:
- Producer (Производитель) отправляет сообщения в RabbitMQ, может быть любым приложением или системой;
- Exchange (Обменник) принимает сообщения от производителя и определяет, в какую очередь поместить сообщение, исходя из правил маршрутизации;
- Queue (Очередь) хранит сообщения до тех пор, пока они не будут обработаны потребителем.
- Consumer (Потребитель) забирает сообщения из очереди и обрабатывает их согласно логике приложения.

Производителем может быть веб-сервер, сервис оплаты, IoT-устройство - это приложение, которое отправляет сообщения в RabbitMQ.
Сообщение - это набор данных. Оно не попадает сразу в очередь, оно проходит через обменник, и именно он решает, в какую именно очередь поместить сообщение.
Queue (Очередь) представляет собой буфер, который хранит сообщения. Очереди очень надёжны, и сообщения можно сделать устойчивыми к перезагрузкам (durable), чтобы не потеряться при падении брокера.
Binding (Привязка) - правило, которое связывает обменник с очередью, указывая, какие сообщения нужно направлять в какую очередь. Может использовать routing key или шаблоны (в случае topic exchange).
Routing Key (Ключ маршрутизации) - это метка, которую производитель добавляет к сообщению. Обменник использует её, чтобы решить, куда направить сообщение.
Брокером в RabbitMQ называется сам сервер, который принимает, хранит и доставляет сообщения. Фактически, всю эту систему целиком можно называть RabbitMQ.
Exchange (Обменник) является точкой входа для сообщений от производителя. Обменник сам решает, куда отправить сообщение дальше, на основе правил привязки (binding), ключа маршрутизации (routing key) и типа обменника.
RabbitMQ поддерживает несколько типов обменников (exchanges), которые определяют правила маршрутизации сообщений:
- Direct - сообщение отправляется в очередь, которая соответствует ключу маршрутизации;
- Fanout - сообщение рассылается во все очереди, связанные с этим обменником;
- Topic - сообщение отправляется в очереди, соответствующие шаблону ключа маршрутизации;
Headers - маршрутизация основана на заголовках сообщения, а не на ключе.
Message Acknowledgment (ACK) - механизм подтверждения: потребитель говорит «я обработал», и сообщение удаляется. Если не подтвердил — оно возвращается в очередь.
Quorum Queues - современный тип очереди, обеспечивающий высокую доступность и отказоустойчивость за счёт репликации и согласованности (как в распределённых системах).
Streams - новый режим работы RabbitMQ (начиная с версии 3.9), позволяющий работать с данными как с логами событий (похоже на Kafka). Подходит для потоковой передачи (streaming), когда нужно читать историю сообщений.
Virtual Host — это изолированное пространство внутри RabbitMQ, которое позволяет разделять ресурсы (очереди, обменники, пользователи) между различными проектами или командами. Каждый Virtual Host имеет свои собственные очереди, обменники и разрешения.
Создать Virtual Host можно через консоль или веб-интерфейс. RabbitMQ предоставляет встроенный веб-интерфейс для мониторинга и управления брокером. Этот интерфейс называется RabbitMQ Management, который представляет собой плагин rabbitmq_management.
RabbitMQ Management по умолчанию доступен по адресу http://localhost:15672 под учётными данными guest/guest (логин/пароль). Интерфейс позволяет просматривать статистику (количество сообщений, скорость обработки, использование памяти), состояние очередей, обменников и соединений, создавать и удалять очереди, обменники.
Имеются и расширения функциональности (плагины). Например:
- rabbitmq_management — веб-интерфейс,
- rabbitmq_shovel — пересылка сообщений между брокерами,
- rabbitmq_federation — объединение нескольких RabbitMQ в сеть.
Как настроить RabbitMQ?
- Установка Erlang на сервер — это зависимость для RabbitMQ (он работает поверх Erlang). Пример:
sudo apt update
sudo apt install erlang
- Установка RabbitMQ. Сначала нужно добавить официальный репозиторий RabbitMQ:
sudo apt-get install curl gnupg
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor > /usr/share/keyrings/rabbitmq-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt update
А затем установить RabbitMQ:
sudo apt install rabbitmq-server
- Запуск RabbitMQ. Для этого нужно запустить службу RabbitMQ:
sudo systemctl start rabbitmq-server
Желательно также включить автозапуск RabbitMQ при загрузке системы:
sudo systemctl enable rabbitmq-server
Чтобы убедиться в корректности запуска, можно проверить статус:
sudo systemctl status rabbitmq-server
- Создание виртуального хоста для изоляции проектов:
sudo rabbitmqctl add_vhost my_vhost
- Настройка прав доступа к виртуальному хосту.
Создание пользователя:
sudo rabbitmqctl add_user my_user my_password
Назначение прав на виртуальный хост:
sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
- Включение плагина управления.
Активировать веб-интерфейс:
sudo rabbitmq-plugins enable rabbitmq_management
Веб-интерфейс будет доступен по адресу http://localhost:15672
- Подключение к RabbitMQ.
В разных языках программирования используются соответствующие клиентские библиотеки. Нужно добавить к проекту программы библиотеку, а затем:
- создать соединение с RabbitMQ, указав хост, порт и учётные данные;
- создать канал (логическое соединение внутри физического соединения - все операции выполняются через канал);
- создать обменник с указанием типа;
- создать очередь с уникальным именем;
- связать обменник с очередью, указав ключ маршрутизации.
7.1. C# - библиотека RabbitMQ.Client.
Пример использования:
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// Создание очереди
channel.QueueDeclare(queue: "my_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
// Отправка сообщения
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "my_queue", basicProperties: null, body: body);
Console.WriteLine("Message sent");
}
7.2. Java - библиотека amqp-client. Пример использования:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class RabbitMQExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Создание очереди
channel.queueDeclare("my_queue", false, false, false, null);
// Отправка сообщения
String message = "Hello, RabbitMQ!";
channel.basicPublish("", "my_queue", null, message.getBytes());
System.out.println("Message sent");
}
}
}
7.3. Python - библиотека pika. Пример использования:
import pika
# Подключение к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='my_queue')
# Отправка сообщения
channel.basic_publish(exchange='', routing_key='my_queue', body='Hello, RabbitMQ!')
print("Message sent")
connection.close()
7.4. JavaScript (Node.js) - библиотека amqplib. Пример использования:
const amqp = require('amqplib');
async function sendMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Создание очереди
const queue = 'my_queue';
await channel.assertQueue(queue, { durable: false });
// Отправка сообщения
const message = 'Hello, RabbitMQ!';
channel.sendToQueue(queue, Buffer.from(message));
console.log("Message sent");
setTimeout(() => {
connection.close();
}, 500);
}
sendMessage();
7.5. PHP - библиотека php-amqplib. Пример использования:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// Создание очереди
$channel->queue_declare('my_queue', false, false, false, false);
// Отправка сообщения
$msg = new AMQPMessage('Hello, RabbitMQ!');
$channel->basic_publish($msg, '', 'my_queue');
echo "Message sent\n";
$channel->close();
$connection->close();
- Отправка и получение сообщений. Отправка сообщения (Producer) выполняется через обменник. Получение сообщения (Consumer) выполняется из очереди. Нужно подписаться на очередь и получить сообщение.
- Мониторинг. В веб-интерфейсе можно анализировать список очередей и статистику, а в разделе Exchanges можно увидеть обменники и их привязки.
- Масштабирование. Для масштабирования можно настроить кластер RabbitMQ. Но важно убедиться, что все узлы кластера имеют одинаковую версию RabbitMQ и Erlang.